/*
 * Copyright (C) 2016-2019 Lightbend Inc. <https://www.lightbend.com>
 */

package akka.remote.artery

import java.net.ConnectException
import java.util.Queue
import java.util.concurrent.CountDownLatch
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference

import scala.annotation.tailrec
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.control.NoStackTrace

import akka.Done
import akka.NotUsed
import akka.actor.ActorRef
import akka.actor.ActorSelectionMessage
import akka.actor.Address
import akka.actor.Cancellable
import akka.actor.Dropped
import akka.dispatch.sysmsg.SystemMessage
import akka.event.Logging
import akka.remote.DaemonMsgCreate
import akka.remote.PriorityMessage
import akka.remote.RemoteActorRef
import akka.remote.UniqueAddress
import akka.remote.artery.ArteryTransport.AeronTerminated
import akka.remote.artery.ArteryTransport.ShuttingDown
import akka.remote.artery.Encoder.OutboundCompressionAccess
import akka.remote.artery.InboundControlJunction.ControlMessageSubject
import akka.remote.artery.OutboundControlJunction.OutboundControlIngress
import akka.remote.artery.OutboundHandshake.HandshakeTimeoutException
import akka.remote.artery.SystemMessageDelivery.ClearSystemMessageDelivery
import akka.remote.artery.aeron.AeronSink.GaveUpMessageException
import akka.remote.artery.compress.CompressionTable
import akka.stream.AbruptTerminationException
import akka.stream.KillSwitches
import akka.stream.Materializer
import akka.stream.SharedKillSwitch
import akka.stream.StreamTcpException
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.MergeHub
import akka.stream.scaladsl.Source
import akka.util.OptionVal
import akka.util.PrettyDuration._
import akka.util.Unsafe
import akka.util.WildcardIndex
import akka.util.ccompat._
import com.github.ghik.silencer.silent
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue

/**
 * INTERNAL API
 */
private[remote] object Association {
  sealed trait QueueWrapper extends SendQueue.ProducerApi[OutboundEnvelope] {
    def queue: Queue[OutboundEnvelope]
  }

  final case class QueueWrapperImpl(queue: Queue[OutboundEnvelope]) extends QueueWrapper {
    override def offer(message: OutboundEnvelope): Boolean = queue.offer(message)

    override def isEnabled: Boolean = true
  }

  object DisabledQueueWrapper extends QueueWrapper {
    override def queue: java.util.Queue[OutboundEnvelope] =
      throw new UnsupportedOperationException("The Queue is disabled")

    override def offer(message: OutboundEnvelope): Boolean =
      throw new UnsupportedOperationException("The method offer() is illegal on a disabled queue")

    override def isEnabled: Boolean = false
  }

  object RemovedQueueWrapper extends QueueWrapper {
    override def queue: java.util.Queue[OutboundEnvelope] =
      throw new UnsupportedOperationException("The Queue is removed")

    override def offer(message: OutboundEnvelope): Boolean = false

    override def isEnabled: Boolean = false
  }

  final case class LazyQueueWrapper(queue: Queue[OutboundEnvelope], materialize: () => Unit) extends QueueWrapper {
    private val onlyOnce = new AtomicBoolean

    def runMaterialize(): Unit = {
      if (onlyOnce.compareAndSet(false, true))
        materialize()
    }

    override def offer(message: OutboundEnvelope): Boolean = {
      runMaterialize()
      queue.offer(message)
    }

    override def isEnabled: Boolean = true
  }

  final val ControlQueueIndex = 0
  final val LargeQueueIndex = 1
  final val OrdinaryQueueIndex = 2

  sealed trait StopSignal
  case object OutboundStreamStopIdleSignal extends RuntimeException("") with StopSignal with NoStackTrace
  case object OutboundStreamStopQuarantinedSignal extends RuntimeException("") with StopSignal with NoStackTrace

  final case class OutboundStreamMatValues(
      streamKillSwitch: OptionVal[SharedKillSwitch],
      completed: Future[Done],
      stopping: OptionVal[StopSignal])
}

/**
 * INTERNAL API
 *
 * Thread-safe, mutable holder for association state. Main entry point for remote destined message to a specific
 * remote address.
 */
@ccompatUsedUntil213
private[remote] class Association(
    val transport: ArteryTransport,
    val materializer: Materializer,
    val controlMaterializer: Materializer,
    override val remoteAddress: Address,
    override val controlSubject: ControlMessageSubject,
    largeMessageDestinations: WildcardIndex[NotUsed],
    priorityMessageDestinations: WildcardIndex[NotUsed],
    outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope])
    extends AbstractAssociation
    with OutboundContext {
  import Association._
  import FlightRecorderEvents._

  require(remoteAddress.port.nonEmpty)

  private val log = Logging(transport.system, getClass.getName)
  private def flightRecorder = transport.topLevelFlightRecorder

  override def settings = transport.settings
  private def advancedSettings = transport.settings.Advanced

  private val restartCounter =
    new RestartCounter(advancedSettings.OutboundMaxRestarts, advancedSettings.OutboundRestartTimeout)

  // We start with the raw wrapped queue and then it is replaced with the materialized value of
  // the `SendQueue` after materialization. Using same underlying queue. This makes it possible to
  // start sending (enqueuing) to the Association immediate after construction.

  def createQueue(capacity: Int, queueIndex: Int): Queue[OutboundEnvelope] = {
    val linked = queueIndex == ControlQueueIndex || queueIndex == LargeQueueIndex
    if (linked)
      new LinkedBlockingQueue[OutboundEnvelope](capacity) // less memory than ManyToOneConcurrentArrayQueue
    else
      new ManyToOneConcurrentArrayQueue[OutboundEnvelope](capacity)
  }

  private val outboundLanes = advancedSettings.OutboundLanes
  private val controlQueueSize = advancedSettings.OutboundControlQueueSize
  private val queueSize = advancedSettings.OutboundMessageQueueSize
  private val largeQueueSize = advancedSettings.OutboundLargeMessageQueueSize

  private[this] val queues: Array[SendQueue.ProducerApi[OutboundEnvelope]] = new Array(2 + outboundLanes)
  queues(ControlQueueIndex) = QueueWrapperImpl(createQueue(controlQueueSize, ControlQueueIndex)) // control stream
  queues(LargeQueueIndex) =
    if (transport.largeMessageChannelEnabled) // large messages stream
      QueueWrapperImpl(createQueue(largeQueueSize, LargeQueueIndex))
    else
      DisabledQueueWrapper

  (0 until outboundLanes).foreach { i =>
    queues(OrdinaryQueueIndex + i) = QueueWrapperImpl(createQueue(queueSize, OrdinaryQueueIndex + i)) // ordinary messages stream
  }
  @volatile private[this] var queuesVisibility = false

  private def controlQueue: SendQueue.ProducerApi[OutboundEnvelope] = queues(ControlQueueIndex)

  @volatile private[this] var _outboundControlIngress: OptionVal[OutboundControlIngress] = OptionVal.None
  @volatile private[this] var materializing = new CountDownLatch(1)
  @volatile private[this] var outboundCompressionAccess: Vector[OutboundCompressionAccess] = Vector.empty

  // keyed by stream queue index
  private[this] val streamMatValues = new AtomicReference(Map.empty[Int, OutboundStreamMatValues])

  private[this] val idleTimer = new AtomicReference[Option[Cancellable]](None)
  private[this] val stopQuarantinedTimer = new AtomicReference[Option[Cancellable]](None)

  private[remote] def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] =
    updateOutboundCompression(c => c.changeActorRefCompression(table))

  private[remote] def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] =
    updateOutboundCompression(c => c.changeClassManifestCompression(table))

  private def clearOutboundCompression(): Future[Done] =
    updateOutboundCompression(c => c.clearCompression())

  private def updateOutboundCompression(action: OutboundCompressionAccess => Future[Done]): Future[Done] = {
    implicit val ec = transport.system.dispatchers.internalDispatcher
    val c = outboundCompressionAccess
    if (c.isEmpty) Future.successful(Done)
    else if (c.size == 1) action(c.head)
    else Future.sequence(c.map(action(_))).map(_ => Done)
  }

  private def clearInboundCompression(originUid: Long): Unit =
    transport.inboundCompressionAccess match {
      case OptionVal.Some(access) => access.closeCompressionFor(originUid)
      case _                      => // do nothing
    }

  private def deadletters = transport.system.deadLetters

  def outboundControlIngress: OutboundControlIngress = {
    _outboundControlIngress match {
      case OptionVal.Some(o) => o
      case OptionVal.None =>
        controlQueue match {
          case w: LazyQueueWrapper => w.runMaterialize()
          case _                   =>
        }
        // the outboundControlIngress may be accessed before the stream is materialized
        // using CountDownLatch to make sure that materialization is completed
        materializing.await(10, TimeUnit.SECONDS)
        _outboundControlIngress match {
          case OptionVal.Some(o) => o
          case OptionVal.None =>
            if (transport.isShutdown) throw ShuttingDown
            else throw new IllegalStateException("outboundControlIngress not initialized yet")
        }
    }
  }

  override def localAddress: UniqueAddress = transport.localAddress

  /**
   * Holds reference to shared state of Association - *access only via helper methods*
   */
  @volatile
  @silent
  private[this] var _sharedStateDoNotCallMeDirectly: AssociationState = AssociationState()

  /**
   * Helper method for access to underlying state via Unsafe
   *
   * @param oldState Previous state
   * @param newState Next state on transition
   * @return Whether the previous state matched correctly
   */
  @inline
  private[artery] def swapState(oldState: AssociationState, newState: AssociationState): Boolean =
    Unsafe.instance.compareAndSwapObject(this, AbstractAssociation.sharedStateOffset, oldState, newState)

  /**
   * @return Reference to current shared state
   */
  def associationState: AssociationState =
    Unsafe.instance.getObjectVolatile(this, AbstractAssociation.sharedStateOffset).asInstanceOf[AssociationState]

  def setControlIdleKillSwitch(killSwitch: OptionVal[SharedKillSwitch]): Unit = {
    val current = associationState
    swapState(current, current.withControlIdleKillSwitch(killSwitch))
  }

  def completeHandshake(peer: UniqueAddress): Future[Done] = {
    require(
      remoteAddress == peer.address,
      s"wrong remote address in completeHandshake, got ${peer.address}, expected $remoteAddress")
    val current = associationState

    current.uniqueRemoteAddressValue() match {
      case Some(`peer`) =>
        // handshake already completed
        Future.successful(Done)
      case _ =>
        // clear outbound compression, it's safe to do that several times if someone else
        // completes handshake at same time, but it's important to clear it before
        // we signal that the handshake is completed (uniqueRemoteAddressPromise.trySuccess)
        implicit val ec = transport.system.dispatchers.internalDispatcher
        clearOutboundCompression().map { _ =>
          current.uniqueRemoteAddressPromise.trySuccess(peer)
          current.uniqueRemoteAddressValue() match {
            case Some(`peer`) =>
            // our value
            case _ =>
              val newState = current.newIncarnation(Promise.successful(peer))
              if (swapState(current, newState)) {
                current.uniqueRemoteAddressValue() match {
                  case Some(old) =>
                    cancelStopQuarantinedTimer()
                    log.debug(
                      "Incarnation {} of association to [{}] with new UID [{}] (old UID [{}])",
                      newState.incarnation,
                      peer.address,
                      peer.uid,
                      old.uid)
                    clearInboundCompression(old.uid)
                  case None =>
                  // Failed, nothing to do
                }
                // if swap failed someone else completed before us, and that is fine
              }
          }
          Done
        }
    }
  }

  // OutboundContext
  override def sendControl(message: ControlMessage): Unit = {
    try {
      if (!transport.isShutdown && !isRemovedAfterQuarantined()) {
        if (associationState.isQuarantined()) {
          log.debug("Send control message [{}] to quarantined [{}]", Logging.messageClassName(message), remoteAddress)
          setupStopQuarantinedTimer()
        }
        outboundControlIngress.sendControlMessage(message)
      }
    } catch {
      case ShuttingDown => // silence it
    }
  }

  def send(message: Any, sender: OptionVal[ActorRef], recipient: OptionVal[RemoteActorRef]): Unit = {

    def createOutboundEnvelope(): OutboundEnvelope =
      outboundEnvelopePool.acquire().init(recipient, message.asInstanceOf[AnyRef], sender)

    // volatile read to see latest queue array
    @silent
    val unused = queuesVisibility

    def dropped(queueIndex: Int, qSize: Int, env: OutboundEnvelope): Unit = {
      val removed = isRemovedAfterQuarantined()
      if (removed) recipient match {
        case OptionVal.Some(ref) => ref.cachedAssociation = null // don't use this Association instance any more
        case OptionVal.None      =>
      }
      val reason =
        if (removed) "Due to removed unused quarantined association"
        else s"Due to overflow of send queue, size [$qSize]"
      transport.system.eventStream
        .publish(Dropped(message, reason, env.sender.getOrElse(ActorRef.noSender), recipient.getOrElse(deadletters)))

      flightRecorder.hiFreq(Transport_SendQueueOverflow, queueIndex)
      deadletters ! env
    }

    val state = associationState
    val quarantined = state.isQuarantined()
    val messageIsClearSystemMessageDelivery = message.isInstanceOf[ClearSystemMessageDelivery]

    // allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system
    if (message.isInstanceOf[ActorSelectionMessage] || !quarantined || messageIsClearSystemMessageDelivery) {
      if (quarantined && !messageIsClearSystemMessageDelivery) {
        log.debug(
          "Quarantine piercing attempt with message [{}] to [{}]",
          Logging.messageClassName(message),
          recipient.getOrElse(""))
        setupStopQuarantinedTimer()
      }
      try {
        val outboundEnvelope = createOutboundEnvelope()
        message match {
          case _: SystemMessage =>
            if (!controlQueue.offer(outboundEnvelope)) {
              quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]")
              dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
            }
          case ActorSelectionMessage(_: PriorityMessage, _, _) | _: ControlMessage | _: ClearSystemMessageDelivery =>
            // ActorSelectionMessage with PriorityMessage is used by cluster and remote failure detector heartbeating
            if (!controlQueue.offer(outboundEnvelope)) {
              dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
            }
          case _: DaemonMsgCreate =>
            // DaemonMsgCreate is not a SystemMessage, but must be sent over the control stream because
            // remote deployment process depends on message ordering for DaemonMsgCreate and Watch messages.
            // First ordinary message may arrive earlier but then the resolve in the Decoder is retried
            // so that the first message can be delivered after the remote actor has been created.
            if (!controlQueue.offer(outboundEnvelope))
              dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
          case _ =>
            val queueIndex = selectQueue(recipient)
            val queue = queues(queueIndex)
            val offerOk = queue.offer(outboundEnvelope)
            if (!offerOk)
              dropped(queueIndex, queueSize, outboundEnvelope)
        }
      } catch {
        case ShuttingDown => // silence it
      }
    } else if (log.isDebugEnabled)
      log.debug(
        "Dropping message [{}] from [{}] to [{}] due to quarantined system [{}]",
        Logging.messageClassName(message),
        sender.getOrElse(deadletters),
        recipient.getOrElse(recipient),
        remoteAddress)
  }

  private def selectQueue(recipient: OptionVal[RemoteActorRef]): Int = {
    recipient match {
      case OptionVal.Some(r) =>
        r.cachedSendQueueIndex match {
          case -1 =>
            // only happens when messages are sent to new remote destination
            // and is then cached on the RemoteActorRef
            val elements = r.path.elements
            val idx =
              if (priorityMessageDestinations.find(elements).isDefined) {
                log.debug("Using priority message stream for {}", r.path)
                ControlQueueIndex
              } else if (transport.largeMessageChannelEnabled && largeMessageDestinations.find(elements).isDefined) {
                log.debug("Using large message stream for {}", r.path)
                LargeQueueIndex
              } else if (outboundLanes == 1) {
                OrdinaryQueueIndex
              } else {
                // select lane based on destination, to preserve message order
                OrdinaryQueueIndex + (math.abs(r.path.uid % outboundLanes))
              }
            r.cachedSendQueueIndex = idx
            idx
          case idx => idx
        }

      case OptionVal.None =>
        OrdinaryQueueIndex
    }
  }

  override def isOrdinaryMessageStreamActive(): Boolean =
    isStreamActive(OrdinaryQueueIndex)

  def isStreamActive(queueIndex: Int): Boolean = {
    queues(queueIndex) match {
      case _: LazyQueueWrapper  => false
      case DisabledQueueWrapper => false
      case RemovedQueueWrapper  => false
      case _                    => true
    }
  }

  def sendTerminationHint(replyTo: ActorRef): Int = {
    if (!associationState.isQuarantined()) {
      val msg = ActorSystemTerminating(localAddress)
      var sent = 0
      queues.iterator.filter(q => q.isEnabled && !q.isInstanceOf[LazyQueueWrapper]).foreach { queue =>
        try {
          val envelope = outboundEnvelopePool.acquire().init(OptionVal.None, msg, OptionVal.Some(replyTo))

          queue.offer(envelope)
          sent += 1
        } catch {
          case ShuttingDown => // can be thrown if `offer` triggers new materialization
        }
      }
      sent
    } else 0
  }

  // OutboundContext
  override def quarantine(reason: String): Unit = {
    val uid = associationState.uniqueRemoteAddressValue().map(_.uid)
    quarantine(reason, uid, harmless = false)
  }

  @tailrec final def quarantine(reason: String, uid: Option[Long], harmless: Boolean): Unit = {
    uid match {
      case Some(u) =>
        val current = associationState
        current.uniqueRemoteAddressValue() match {
          case Some(peer) if peer.uid == u =>
            if (!current.isQuarantined(u)) {
              val newState = current.newQuarantined()
              if (swapState(current, newState)) {
                // quarantine state change was performed
                if (harmless) {
                  log.info(
                    "Association to [{}] having UID [{}] has been stopped. All " +
                    "messages to this UID will be delivered to dead letters. Reason: {}",
                    remoteAddress,
                    u,
                    reason)
                  transport.system.eventStream
                    .publish(GracefulShutdownQuarantinedEvent(UniqueAddress(remoteAddress, u), reason))
                } else {
                  log.warning(
                    "Association to [{}] with UID [{}] is irrecoverably failed. UID is now quarantined and all " +
                    "messages to this UID will be delivered to dead letters. " +
                    "Remote ActorSystem must be restarted to recover from this situation. Reason: {}",
                    remoteAddress,
                    u,
                    reason)
                  transport.system.eventStream.publish(QuarantinedEvent(UniqueAddress(remoteAddress, u)))
                }
                flightRecorder.loFreq(Transport_Quarantined, s"$remoteAddress - $u")
                clearOutboundCompression()
                clearInboundCompression(u)
                // end delivery of system messages to that incarnation after this point
                send(ClearSystemMessageDelivery(current.incarnation), OptionVal.None, OptionVal.None)
                if (!harmless) {
                  // try to tell the other system that we have quarantined it
                  sendControl(Quarantined(localAddress, peer))
                }
                setupStopQuarantinedTimer()
              } else
                quarantine(reason, uid, harmless) // recursive
            }
          case Some(peer) =>
            log.info(
              "Quarantine of [{}] ignored due to non-matching UID, quarantine requested for [{}] but current is [{}]. {}",
              remoteAddress,
              u,
              peer.uid,
              reason)
            send(ClearSystemMessageDelivery(current.incarnation - 1), OptionVal.None, OptionVal.None)
          case None =>
            log.info(
              "Quarantine of [{}] ignored because handshake not completed, quarantine request was for old incarnation. {}",
              remoteAddress,
              reason)
        }
      case None =>
        log.warning("Quarantine of [{}] ignored because unknown UID", remoteAddress)
    }

  }

  /**
   * After calling this no messages can be sent with this Association instance
   */
  def removedAfterQuarantined(): Unit = {
    if (!isRemovedAfterQuarantined()) {
      flightRecorder.loFreq(Transport_RemovedQuarantined, remoteAddress.toString)
      queues(ControlQueueIndex) = RemovedQueueWrapper

      if (transport.largeMessageChannelEnabled)
        queues(LargeQueueIndex) = RemovedQueueWrapper

      (0 until outboundLanes).foreach { i =>
        queues(OrdinaryQueueIndex + i) = RemovedQueueWrapper
      }
      queuesVisibility = true // volatile write for visibility of the queues array

      // cleanup
      _outboundControlIngress = OptionVal.None
      outboundCompressionAccess = Vector.empty
      cancelAllTimers()
      abortQuarantined()

      log.info("Unused association to [{}] removed after quarantine", remoteAddress)
    }
  }

  def isRemovedAfterQuarantined(): Boolean =
    queues(ControlQueueIndex) == RemovedQueueWrapper

  private def cancelStopQuarantinedTimer(): Unit = {
    val current = stopQuarantinedTimer.get
    current.foreach(_.cancel())
    stopQuarantinedTimer.compareAndSet(current, None)
  }

  private def setupStopQuarantinedTimer(): Unit = {
    cancelStopQuarantinedTimer()
    stopQuarantinedTimer.set(Some(transport.system.scheduler.scheduleOnce(advancedSettings.StopQuarantinedAfterIdle) {
      if (associationState.isQuarantined())
        abortQuarantined()
    }(transport.system.dispatchers.internalDispatcher)))
  }

  private def abortQuarantined(): Unit = {
    cancelIdleTimer()
    streamMatValues.get.foreach {
      case (queueIndex, OutboundStreamMatValues(killSwitch, _, _)) =>
        killSwitch match {
          case OptionVal.Some(k) =>
            setStopReason(queueIndex, OutboundStreamStopQuarantinedSignal)
            clearStreamKillSwitch(queueIndex, k)
            k.abort(OutboundStreamStopQuarantinedSignal)
          case OptionVal.None => // already aborted
        }
    }
  }

  private def cancelIdleTimer(): Unit = {
    val current = idleTimer.get
    current.foreach(_.cancel())
    idleTimer.compareAndSet(current, None)
  }

  private def setupIdleTimer(): Unit = {
    if (idleTimer.get.isEmpty) {
      val StopIdleOutboundAfter = settings.Advanced.StopIdleOutboundAfter
      val QuarantineIdleOutboundAfter = settings.Advanced.QuarantineIdleOutboundAfter
      val interval = StopIdleOutboundAfter / 2
      val initialDelay = settings.Advanced.Tcp.ConnectionTimeout.max(StopIdleOutboundAfter) + 1.second
      val task =
        transport.system.scheduler.scheduleWithFixedDelay(initialDelay, interval) { () =>
          val lastUsedDurationNanos = System.nanoTime() - associationState.lastUsedTimestamp.get
          if (lastUsedDurationNanos >= QuarantineIdleOutboundAfter.toNanos && !associationState.isQuarantined()) {
            // If idle longer than quarantine-idle-outbound-after and the low frequency HandshakeReq
            // doesn't get through it will be quarantined to cleanup lingering associations to crashed systems.
            quarantine(s"Idle longer than quarantine-idle-outbound-after [${QuarantineIdleOutboundAfter.pretty}]")
          } else if (lastUsedDurationNanos >= StopIdleOutboundAfter.toNanos) {
            streamMatValues.get.foreach {
              case (queueIndex, OutboundStreamMatValues(streamKillSwitch, _, stopping)) =>
                if (isStreamActive(queueIndex) && stopping.isEmpty) {
                  if (queueIndex != ControlQueueIndex) {
                    streamKillSwitch match {
                      case OptionVal.Some(k) =>
                        // for non-control streams we can stop the entire stream
                        log.info("Stopping idle outbound stream [{}] to [{}]", queueIndex, remoteAddress)
                        flightRecorder.loFreq(Transport_StopIdleOutbound, s"$remoteAddress - $queueIndex")
                        setStopReason(queueIndex, OutboundStreamStopIdleSignal)
                        clearStreamKillSwitch(queueIndex, k)
                        k.abort(OutboundStreamStopIdleSignal)
                      case OptionVal.None => // already aborted
                    }

                  } else {
                    // only stop the transport parts of the stream because SystemMessageDelivery stage has
                    // state (seqno) and system messages might be sent at the same time
                    associationState.controlIdleKillSwitch match {
                      case OptionVal.Some(killSwitch) =>
                        log.info("Stopping idle outbound control stream to [{}]", remoteAddress)
                        flightRecorder.loFreq(Transport_StopIdleOutbound, s"$remoteAddress - $queueIndex")
                        setControlIdleKillSwitch(OptionVal.None)
                        killSwitch.abort(OutboundStreamStopIdleSignal)
                      case OptionVal.None => // already stopped
                    }
                  }
                }
            }
          }
        }(transport.system.dispatcher)

      if (!idleTimer.compareAndSet(None, Some(task))) {
        // another thread did same thing and won
        task.cancel()
      }
    }
  }

  private def cancelAllTimers(): Unit = {
    cancelIdleTimer()
    cancelStopQuarantinedTimer()
  }

  private def sendToDeadLetters[T](pending: Vector[OutboundEnvelope]): Unit = {
    pending.foreach(transport.system.deadLetters ! _)
  }

  /**
   * Called once after construction when the `Association` instance
   * wins the CAS in the `AssociationRegistry`. It will materialize
   * the streams. It is possible to sending (enqueuing) to the association
   * before this method is called.
   *
   * @throws ShuttingDown if called while the transport is shutting down
   */
  def associate(): Unit = {
    if (!controlQueue.isInstanceOf[QueueWrapper])
      throw new IllegalStateException("associate() must only be called once")
    runOutboundStreams()
  }

  private def runOutboundStreams(): Unit = {

    // it's important to materialize the outboundControl stream first,
    // so that outboundControlIngress is ready when stages for all streams start
    runOutboundControlStream()
    runOutboundOrdinaryMessagesStream()

    if (transport.largeMessageChannelEnabled)
      runOutboundLargeMessagesStream()
  }

  private def runOutboundControlStream(): Unit = {
    if (transport.isShutdown) throw ShuttingDown
    log.debug("Starting outbound control stream to [{}]", remoteAddress)

    val wrapper = getOrCreateQueueWrapper(ControlQueueIndex, queueSize)
    queues(ControlQueueIndex) = wrapper // use new underlying queue immediately for restarts
    queuesVisibility = true // volatile write for visibility of the queues array

    val streamKillSwitch = KillSwitches.shared("outboundControlStreamKillSwitch")

    def sendQueuePostStop[T](pending: Vector[OutboundEnvelope]): Unit = {
      sendToDeadLetters(pending)
      val systemMessagesCount = pending.count(env => env.message.isInstanceOf[SystemMessage])
      if (systemMessagesCount > 0)
        quarantine(s"SendQueue stopped with [$systemMessagesCount] pending system messages.")
    }

    val (queueValue, (control, completed)) =
      Source
        .fromGraph(new SendQueue[OutboundEnvelope](sendQueuePostStop))
        .via(streamKillSwitch.flow)
        .toMat(transport.outboundControl(this))(Keep.both)
        .run()(materializer)

    queueValue.inject(wrapper.queue)
    // replace with the materialized value, still same underlying queue
    queues(ControlQueueIndex) = queueValue
    queuesVisibility = true // volatile write for visibility of the queues array
    _outboundControlIngress = OptionVal.Some(control)
    materializing.countDown()

    updateStreamMatValues(ControlQueueIndex, streamKillSwitch, completed)
    setupIdleTimer()
    attachOutboundStreamRestart(
      "Outbound control stream",
      ControlQueueIndex,
      controlQueueSize,
      completed,
      () => runOutboundControlStream())
  }

  private def getOrCreateQueueWrapper(queueIndex: Int, capacity: Int): QueueWrapper = {
    @silent
    val unused = queuesVisibility // volatile read to see latest queues array
    queues(queueIndex) match {
      case existing: QueueWrapper => existing
      case _                      =>
        // use new queue for restarts
        QueueWrapperImpl(createQueue(capacity, queueIndex))
    }
  }

  private def runOutboundOrdinaryMessagesStream(): Unit = {
    if (transport.isShutdown) throw ShuttingDown

    val streamKillSwitch = KillSwitches.shared("outboundMessagesKillSwitch")

    if (outboundLanes == 1) {
      log.debug("Starting outbound message stream to [{}]", remoteAddress)
      val queueIndex = OrdinaryQueueIndex
      val wrapper = getOrCreateQueueWrapper(queueIndex, queueSize)
      queues(queueIndex) = wrapper // use new underlying queue immediately for restarts
      queuesVisibility = true // volatile write for visibility of the queues array

      val (queueValue, _, changeCompression, completed) =
        Source
          .fromGraph(new SendQueue[OutboundEnvelope](sendToDeadLetters))
          .via(streamKillSwitch.flow)
          .viaMat(transport.outboundTestFlow(this))(Keep.both)
          .toMat(transport.outbound(this))({ case ((a, b), (c, d)) => (a, b, c, d) }) // "keep all, exploded"
          .run()(materializer)

      queueValue.inject(wrapper.queue)
      // replace with the materialized value, still same underlying queue
      queues(queueIndex) = queueValue
      queuesVisibility = true // volatile write for visibility of the queues array
      outboundCompressionAccess = Vector(changeCompression)

      updateStreamMatValues(OrdinaryQueueIndex, streamKillSwitch, completed)
      attachOutboundStreamRestart(
        "Outbound message stream",
        OrdinaryQueueIndex,
        queueSize,
        completed,
        () => runOutboundOrdinaryMessagesStream())

    } else {
      log.debug("Starting outbound message stream to [{}] with [{}] lanes", remoteAddress, outboundLanes)
      val wrappers = (0 until outboundLanes).map { i =>
        val wrapper = getOrCreateQueueWrapper(OrdinaryQueueIndex + i, queueSize)
        queues(OrdinaryQueueIndex + i) = wrapper // use new underlying queue immediately for restarts
        queuesVisibility = true // volatile write for visibility of the queues array
        wrapper
      }.toVector

      val lane = Source
        .fromGraph(new SendQueue[OutboundEnvelope](sendToDeadLetters))
        .via(streamKillSwitch.flow)
        .via(transport.outboundTestFlow(this))
        .viaMat(transport.outboundLane(this))(Keep.both)
        .watchTermination()(Keep.both)
        // recover to avoid error logging by MergeHub
        .recoverWithRetries(-1, { case _: Throwable => Source.empty })
        .mapMaterializedValue {
          case ((q, c), w) => (q, c, w)
        }

      val (mergeHub, transportSinkCompleted) = MergeHub
        .source[EnvelopeBuffer]
        .via(streamKillSwitch.flow)
        .toMat(transport.outboundTransportSink(this))(Keep.both)
        .run()(materializer)

      val values: Vector[(SendQueue.QueueValue[OutboundEnvelope], Encoder.OutboundCompressionAccess, Future[Done])] =
        (0 until outboundLanes).iterator
          .map { _ =>
            lane.to(mergeHub).run()(materializer)
          }
          .to(Vector)

      val (queueValues, compressionAccessValues, laneCompletedValues) = values.unzip3

      implicit val ec = transport.system.dispatchers.internalDispatcher

      // tear down all parts if one part fails or completes
      Future.firstCompletedOf(laneCompletedValues).failed.foreach { reason =>
        streamKillSwitch.abort(reason)
      }
      (laneCompletedValues :+ transportSinkCompleted).foreach(_.foreach { _ =>
        streamKillSwitch.shutdown()
      })

      val allCompleted = Future.sequence(laneCompletedValues).flatMap(_ => transportSinkCompleted)

      queueValues.zip(wrappers).zipWithIndex.foreach {
        case ((q, w), i) =>
          q.inject(w.queue)
          queues(OrdinaryQueueIndex + i) = q // replace with the materialized value, still same underlying queue
      }
      queuesVisibility = true // volatile write for visibility of the queues array

      outboundCompressionAccess = compressionAccessValues

      attachOutboundStreamRestart(
        "Outbound message stream",
        OrdinaryQueueIndex,
        queueSize,
        allCompleted,
        () => runOutboundOrdinaryMessagesStream())
    }
  }

  private def runOutboundLargeMessagesStream(): Unit = {
    if (transport.isShutdown) throw ShuttingDown
    log.debug("Starting outbound large message stream to [{}]", remoteAddress)
    val wrapper = getOrCreateQueueWrapper(LargeQueueIndex, largeQueueSize)
    queues(LargeQueueIndex) = wrapper // use new underlying queue immediately for restarts
    queuesVisibility = true // volatile write for visibility of the queues array

    val streamKillSwitch = KillSwitches.shared("outboundLargeMessagesKillSwitch")

    val (queueValue, completed) = Source
      .fromGraph(new SendQueue[OutboundEnvelope](sendToDeadLetters))
      .via(streamKillSwitch.flow)
      .via(transport.outboundTestFlow(this))
      .toMat(transport.outboundLarge(this))(Keep.both)
      .run()(materializer)

    queueValue.inject(wrapper.queue)
    // replace with the materialized value, still same underlying queue
    queues(LargeQueueIndex) = queueValue
    queuesVisibility = true // volatile write for visibility of the queues array

    updateStreamMatValues(LargeQueueIndex, streamKillSwitch, completed)
    attachOutboundStreamRestart(
      "Outbound large message stream",
      LargeQueueIndex,
      largeQueueSize,
      completed,
      () => runOutboundLargeMessagesStream())
  }

  private def attachOutboundStreamRestart(
      streamName: String,
      queueIndex: Int,
      queueCapacity: Int,
      streamCompleted: Future[Done],
      restart: () => Unit): Unit = {

    def lazyRestart(): Unit = {
      flightRecorder.loFreq(Transport_RestartOutbound, s"$remoteAddress - $streamName")
      outboundCompressionAccess = Vector.empty
      if (queueIndex == ControlQueueIndex) {
        materializing = new CountDownLatch(1)
        _outboundControlIngress = OptionVal.None
      }
      // LazyQueueWrapper will invoke the `restart` function when first message is offered
      val wrappedRestartFun: () => Unit = () => {
        restart()
      }

      if (!isRemovedAfterQuarantined())
        queues(queueIndex) = LazyQueueWrapper(createQueue(queueCapacity, queueIndex), wrappedRestartFun)

      queuesVisibility = true // volatile write for visibility of the queues array
    }

    implicit val ec = materializer.executionContext
    streamCompleted.foreach { _ =>
      // shutdown as expected
      // countDown the latch in case threads are waiting on the latch in outboundControlIngress method
      materializing.countDown()
    }
    streamCompleted.failed.foreach {
      case ArteryTransport.ShutdownSignal =>
        // shutdown as expected
        cancelAllTimers()
        // countDown the latch in case threads are waiting on the latch in outboundControlIngress method
        materializing.countDown()
      case cause if transport.isShutdown || isRemovedAfterQuarantined() =>
        // don't restart after shutdown, but log some details so we notice
        // for the TCP transport the ShutdownSignal is "converted" to StreamTcpException
        if (!cause.isInstanceOf[StreamTcpException])
          log.error(cause, s"{} to [{}] failed after shutdown. {}", streamName, remoteAddress, cause.getMessage)
        cancelAllTimers()
        // countDown the latch in case threads are waiting on the latch in outboundControlIngress method
        materializing.countDown()
      case _: AeronTerminated =>
        // shutdown already in progress
        cancelAllTimers()
      case _: AbruptTerminationException =>
        // ActorSystem shutdown
        cancelAllTimers()
      case cause =>
        // it might have been stopped as expected due to idle or quarantine
        // for the TCP transport the exception is "converted" to StreamTcpException
        val stoppedIdle = cause == OutboundStreamStopIdleSignal ||
          getStopReason(queueIndex).contains(OutboundStreamStopIdleSignal)
        val stoppedQuarantined = cause == OutboundStreamStopQuarantinedSignal ||
          getStopReason(queueIndex).contains(OutboundStreamStopQuarantinedSignal)

        // for some cases restart unconditionally, without counting restarts
        val bypassRestartCounter = cause match {
          case _: GaveUpMessageException => true
          case _                         => stoppedIdle || stoppedQuarantined
        }

        if (queueIndex == ControlQueueIndex && !stoppedQuarantined) {
          cause match {
            case _: HandshakeTimeoutException => // ok, quarantine not possible without UID
            case _                            =>
              // Must quarantine in case all system messages haven't been delivered.
              // See also comment in the stoppedIdle case below
              quarantine(s"Outbound control stream restarted. $cause")
          }
        }

        def isConnectException: Boolean =
          cause.isInstanceOf[StreamTcpException] && cause.getCause != null && cause.getCause
            .isInstanceOf[ConnectException]

        if (stoppedIdle) {
          log.debug("{} to [{}] was idle and stopped. It will be restarted if used again.", streamName, remoteAddress)
          lazyRestart()
        } else if (stoppedQuarantined) {
          log.debug(
            "{} to [{}] was quarantined and stopped. It will be restarted if used again.",
            streamName,
            remoteAddress)
          lazyRestart()
        } else if (bypassRestartCounter || restartCounter.restart()) {
          // ConnectException may happen repeatedly and are already logged in connectionFlowWithRestart
          if (isConnectException)
            log.debug("{} to [{}] failed. Restarting it. {}", streamName, remoteAddress, cause)
          else
            log.warning("{} to [{}] failed. Restarting it. {}", streamName, remoteAddress, cause)
          lazyRestart()
        } else {
          log.error(
            cause,
            s"{} to [{}] failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}",
            streamName,
            remoteAddress,
            advancedSettings.OutboundMaxRestarts,
            advancedSettings.OutboundRestartTimeout.toSeconds)
          cancelAllTimers()
          transport.system.terminate()
        }
    }
  }

  private def updateStreamMatValues(
      streamId: Int,
      streamKillSwitch: SharedKillSwitch,
      completed: Future[Done]): Unit = {
    implicit val ec = materializer.executionContext
    updateStreamMatValues(streamId, OutboundStreamMatValues(OptionVal.Some(streamKillSwitch), completed.recover {
      case _ => Done
    }, stopping = OptionVal.None))
  }

  @tailrec private def updateStreamMatValues(streamId: Int, values: OutboundStreamMatValues): Unit = {
    val prev = streamMatValues.get()
    if (!streamMatValues.compareAndSet(prev, prev + (streamId -> values))) {
      updateStreamMatValues(streamId, values)
    }
  }

  @tailrec private def setStopReason(streamId: Int, stopSignal: StopSignal): Unit = {
    val prev = streamMatValues.get()
    prev.get(streamId) match {
      case Some(v) =>
        if (!streamMatValues.compareAndSet(prev, prev.updated(streamId, v.copy(stopping = OptionVal.Some(stopSignal)))))
          setStopReason(streamId, stopSignal)
      case None => throw new IllegalStateException(s"Expected streamMatValues for [$streamId]")
    }
  }

  private def getStopReason(streamId: Int): OptionVal[StopSignal] = {
    streamMatValues.get().get(streamId) match {
      case Some(OutboundStreamMatValues(_, _, stopping)) => stopping
      case None                                          => OptionVal.None
    }
  }

  // after it has been used we remove the kill switch to cleanup some memory,
  // not a "leak" but a KillSwitch is rather heavy
  @tailrec private def clearStreamKillSwitch(streamId: Int, old: SharedKillSwitch): Unit = {
    val prev = streamMatValues.get()
    prev.get(streamId) match {
      case Some(v) =>
        if (v.streamKillSwitch.isDefined && (v.streamKillSwitch.get eq old)) {
          if (!streamMatValues.compareAndSet(prev, prev.updated(streamId, v.copy(streamKillSwitch = OptionVal.None))))
            clearStreamKillSwitch(streamId, old)
        }
      case None => throw new IllegalStateException(s"Expected streamMatValues for [$streamId]")
    }
  }

  /**
   * Exposed for orderly shutdown purposes, can not be trusted except for during shutdown as streams may restart.
   * Will complete successfully even if one of the stream completion futures failed
   */
  def streamsCompleted: Future[Done] = {
    implicit val ec = materializer.executionContext
    Future
      .sequence(streamMatValues.get().values.map {
        case OutboundStreamMatValues(_, done, _) => done
      })
      .map(_ => Done)
  }

  override def toString: String =
    s"Association($localAddress -> $remoteAddress with $associationState)"

}

/**
 * INTERNAL API
 */
private[remote] class AssociationRegistry(createAssociation: Address => Association) {
  private[this] val associationsByAddress = new AtomicReference[Map[Address, Association]](Map.empty)
  private[this] val associationsByUid = new AtomicReference[ImmutableLongMap[Association]](ImmutableLongMap.empty)

  /**
   * @throws ShuttingDown if called while the transport is shutting down
   */
  @tailrec final def association(remoteAddress: Address): Association = {
    val currentMap = associationsByAddress.get
    currentMap.get(remoteAddress) match {
      case Some(existing) => existing
      case None =>
        val newAssociation = createAssociation(remoteAddress)
        val newMap = currentMap.updated(remoteAddress, newAssociation)
        if (associationsByAddress.compareAndSet(currentMap, newMap)) {
          newAssociation.associate() // start it, only once
          newAssociation
        } else
          association(remoteAddress) // lost CAS, retry
    }
  }

  def association(uid: Long): OptionVal[Association] =
    associationsByUid.get.get(uid)

  /**
   * @throws ShuttingDown if called while the transport is shutting down
   */
  @tailrec final def setUID(peer: UniqueAddress): Association = {
    // Don't create a new association via this method. It's supposed to exist unless it was removed after quarantined.
    val a = association(peer.address)

    val currentMap = associationsByUid.get
    currentMap.get(peer.uid) match {
      case OptionVal.Some(previous) =>
        if (previous eq a)
          // associationsByUid Map already contains the right association
          a
        else
          // make sure we don't overwrite same UID with different association
          throw new IllegalArgumentException(s"UID collision old [$previous] new [$a]")
      case _ =>
        // update associationsByUid Map with the uid -> association
        val newMap = currentMap.updated(peer.uid, a)
        if (associationsByUid.compareAndSet(currentMap, newMap))
          a
        else
          setUID(peer) // lost CAS, retry
    }
  }

  def allAssociations: Set[Association] =
    associationsByAddress.get.values.toSet

  def removeUnusedQuarantined(after: FiniteDuration): Unit = {
    removeUnusedQuarantinedByAddress(after)
    removeUnusedQuarantinedByUid(after)
  }

  @tailrec private def removeUnusedQuarantinedByAddress(after: FiniteDuration): Unit = {
    val now = System.nanoTime()
    val afterNanos = after.toNanos
    val currentMap = associationsByAddress.get
    val remove = currentMap.foldLeft(Map.empty[Address, Association]) {
      case (acc, (address, association)) =>
        val state = association.associationState
        if (state.isQuarantined() && ((now - state.lastUsedTimestamp.get) >= afterNanos))
          acc.updated(address, association)
        else
          acc
    }
    if (remove.nonEmpty) {
      val newMap = currentMap -- remove.keysIterator
      if (associationsByAddress.compareAndSet(currentMap, newMap))
        remove.valuesIterator.foreach(_.removedAfterQuarantined())
      else
        removeUnusedQuarantinedByAddress(after) // CAS fail, recursive
    }
  }

  @tailrec private def removeUnusedQuarantinedByUid(after: FiniteDuration): Unit = {
    val now = System.nanoTime()
    val afterNanos = after.toNanos
    val currentMap = associationsByUid.get
    var remove = Map.empty[Long, Association]
    currentMap.keysIterator.foreach { uid =>
      val association = currentMap.get(uid).get
      val state = association.associationState
      if (state.isQuarantined() && ((now - state.lastUsedTimestamp.get) >= afterNanos))
        remove = remove.updated(uid, association)
    }
    if (remove.nonEmpty) {
      val newMap = remove.keysIterator.foldLeft(currentMap)((acc, uid) => acc.remove(uid))
      if (associationsByUid.compareAndSet(currentMap, newMap))
        remove.valuesIterator.foreach(_.removedAfterQuarantined())
      else
        removeUnusedQuarantinedByUid(after) // CAS fail, recursive
    }
  }
}
